From 395b317a5053bba91c8a31f2730bc70e757f6748 Mon Sep 17 00:00:00 2001 From: "mjw@wray-m-3.hpl.hp.com" Date: Tue, 10 Aug 2004 15:53:15 +0000 Subject: [PATCH] bitkeeper revision 1.1159.1.35 (4118ef6bM1GarM_w6aDNImKG_0GSRw) Improve error handling for save, and compress the file. --- tools/python/xen/xend/XendMigrate.py | 37 +++++++++++----- tools/xfrd/xfrd.c | 65 ++++++++++++++++------------ 2 files changed, 62 insertions(+), 40 deletions(-) diff --git a/tools/python/xen/xend/XendMigrate.py b/tools/python/xen/xend/XendMigrate.py index 6eaf0d1fba..1ee32f90c0 100644 --- a/tools/python/xen/xend/XendMigrate.py +++ b/tools/python/xen/xend/XendMigrate.py @@ -14,7 +14,8 @@ from twisted.internet.protocol import ClientFactory import sxp import XendDB import EventServer; eserver = EventServer.instance() - +from XendError import XendError + """The port for the migrate/save daemon xfrd.""" XFRD_PORT = 8002 @@ -77,12 +78,17 @@ class XfrdClientFactory(ClientFactory): def clientConnectionFailed(self, connector, reason): print 'clientConnectionFailed>', 'connector=', connector, 'reason=', reason + self.xinfo.error(reason) class XfrdInfo: """Abstract class for info about a session with xfrd. Has subclasses for save and migrate. """ + """Suspend timeout (seconds). + We set a timeout because suspending a domain can hang.""" + timeout = 30 + def __init__(self): from xen.xend import XendDomain self.xd = XendDomain.instance() @@ -91,19 +97,18 @@ class XfrdInfo: self.paused = {} def vmconfig(self): - print 'vmconfig>' dominfo = self.xd.domain_get(self.src_dom) - print 'vmconfig>', type(dominfo), dominfo if dominfo: val = sxp.to_string(dominfo.sxpr()) else: val = None - print 'vmconfig<', 'val=', type(val), val return val def error(self, err): + print 'Error>', err self.state = 'error' if not self.deferred.called: + print 'Error> calling errback' self.deferred.errback(err) def dispatch(self, xfrd, val): @@ -115,6 +120,7 @@ class XfrdInfo: def cberr(err): v = ['xfr.err', errno.EINVAL] sxp.show(v, out=xfrd.transport) + self.error(err) op = sxp.name(val) op = op.replace('.', '_') @@ -144,10 +150,6 @@ class XfrdInfo: if not err: return self.error(err); xfrd.loseConnection() - #try: - # self.xd.domain_unpause(self.src_dom) - #except: - # print >>sys.stdout, "Error unpausing domain:", self.src_dom return None def xfr_progress(self, xfrd, val): @@ -178,6 +180,8 @@ class XfrdInfo: def xfr_vm_suspend(self, xfrd, val): """Suspend a domain. Suspending takes time, so we return a Deferred that is called when the suspend completes. + Suspending can hang, so we set a timeout and fail if it + takes too long. """ print 'xfr_vm_suspend>', val try: @@ -185,17 +189,17 @@ class XfrdInfo: d = defer.Deferred() # Subscribe to 'suspended' events so we can tell when the # suspend completes. Subscribe to 'died' events so we can tell if - # the domain died. + # the domain died. Set a timeout and error handler so the subscriptions + # will be cleaned up if suspending hangs or there is an error. def onSuspended(e, v): - print 'onSuspended>', e, v + print 'xfr_vm_suspend>onSuspended>', e, v if v[1] != vmid: return subscribe(on=0) d.callback(v) def onDied(e, v): - print 'onDied>', e, v + print 'xfr_vm_suspend>onDied>', e, v if v[1] != vmid: return - subscribe(on=0) d.errback(XendError('Domain died')) def subscribe(on=1): @@ -206,9 +210,16 @@ class XfrdInfo: action('xend.domain.suspended', onSuspended) action('xend.domain.died', onDied) + def cberr(err): + print 'xfr_vm_suspend>cberr>', err + subscribe(on=0) + return err + subscribe() val = self.xd.domain_shutdown(vmid, reason='suspend') self.suspended[vmid] = 1 + d.addErrback(cberr) + d.setTimeout(self.timeout) return d except: val = errno.EINVAL @@ -278,6 +289,7 @@ class XendMigrateInfo(XfrdInfo): eserver.inject('xend.migrate.ok', self.sxpr()) else: self.state = 'error' + self.error(XendError("save failed")) eserver.inject('xend.migrate.error', self.sxpr()) class XendSaveInfo(XfrdInfo): @@ -320,6 +332,7 @@ class XendSaveInfo(XfrdInfo): eserver.inject('xend.save.ok', self.sxpr()) else: self.state = 'error' + self.error(XendError("save failed")) eserver.inject('xend.save.error', self.sxpr()) diff --git a/tools/xfrd/xfrd.c b/tools/xfrd/xfrd.c index 301674b1fe..1b8664b7da 100644 --- a/tools/xfrd/xfrd.c +++ b/tools/xfrd/xfrd.c @@ -19,6 +19,8 @@ #include #include #include +#include +#include #include #include #include @@ -33,6 +35,7 @@ #include "file_stream.h" #include "string_stream.h" #include "lzi_stream.h" +#include "gzip_stream.h" #include "sys_net.h" #include "sys_string.h" @@ -46,7 +49,7 @@ #include "select.h" #define MODULE_NAME "XFRD" -#define DEBUG 1 +#define DEBUG 0 #include "debug.h" /* @@ -324,8 +327,7 @@ void set_defaults(Args *args){ int stringof(Sxpr exp, char **s){ int err = 0; - dprintf(">\n"); - objprint(iostdout, exp, PRINT_TYPE); IOStream_print(iostdout, "\n"); + //dprintf(">\n"); objprint(iostdout, exp, PRINT_TYPE); IOStream_print(iostdout, "\n"); if(ATOMP(exp)){ *s = atom_name(exp); } else if(STRINGP(exp)){ @@ -334,7 +336,7 @@ int stringof(Sxpr exp, char **s){ err = -EINVAL; *s = NULL; } - dprintf("< err=%d s=%s\n", err, *s); + //dprintf("< err=%d s=%s\n", err, *s); return err; } @@ -342,8 +344,7 @@ int intof(Sxpr exp, int *v){ int err = 0; char *s; unsigned long l; - dprintf(">\n"); - objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n"); + //dprintf(">\n"); objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n"); if(INTP(exp)){ *v = OBJ_INT(exp); } else { @@ -353,7 +354,7 @@ int intof(Sxpr exp, int *v){ *v = (int)l; } exit: - dprintf("< err=%d v=%d\n", err, *v); + //dprintf("< err=%d v=%d\n", err, *v); return err; } @@ -361,8 +362,7 @@ int addrof(Sxpr exp, uint32_t *v){ char *h; unsigned long a; int err = 0; - dprintf(">\n"); - objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n"); + //dprintf(">\n"); objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n"); err = stringof(exp, &h); if(err) goto exit; if(get_host_address(h, &a)){ @@ -371,15 +371,14 @@ int addrof(Sxpr exp, uint32_t *v){ } *v = a; exit: - dprintf("< err=%d v=%x\n", err, *v); + //dprintf("< err=%d v=%x\n", err, *v); return err; } int portof(Sxpr exp, uint16_t *v){ char *s; int err = 0; - dprintf(">\n"); - objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n"); + //dprintf(">\n"); objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n"); if(INTP(exp)){ *v = get_ul(exp); *v = htons(*v); @@ -395,7 +394,7 @@ int portof(Sxpr exp, uint16_t *v){ *v = p; } exit: - dprintf("< err=%d v=%u\n", err, *v); + //dprintf("< err=%d v=%u\n", err, *v); return err; } @@ -468,7 +467,7 @@ int xfr_hello(Conn *conn){ err = Conn_sxpr(conn, &sxpr); if(err) goto exit; if(!sxpr_elementp(sxpr, oxfr_hello)){ - dprintf("> sxpr_elementp test failed\n"); + wprintf("> sxpr_elementp test failed\n"); err = -EINVAL; goto exit; } @@ -507,7 +506,6 @@ int xfr_send_hello(Conn *conn){ XFR_PROTO_MINOR); if(err < 0) goto exit; IOStream_flush(conn->out); - dprintf("> xfr_response...\n"); err = xfr_response(conn); exit: dprintf("< err=%d\n", err); @@ -671,7 +669,6 @@ int xfr_send(Args *args, XfrState *state, Conn *xend, uint32_t addr, uint32_t po dprintf("> Xfr xfr_addr=%s:%d\n", inet_ntoa(xfr_addr), ntohs(xfr_port)); err = Conn_connect(peer, flags, xfr_addr, xfr_port); if(err) goto exit; - printf("\n"); XfrState_set_state(state, XFR_HELLO); // Send hello message. err = xfr_send_hello(peer); @@ -685,10 +682,9 @@ int xfr_send(Args *args, XfrState *state, Conn *xend, uint32_t addr, uint32_t po int plain_bytes = lzi_stream_plain_bytes(zio); int comp_bytes = lzi_stream_comp_bytes(zio); float ratio = lzi_stream_ratio(zio); - dprintf("> Compression: plain %d bytes, compressed %d bytes, ratio %3.2f\n", + iprintf("> Compression: plain %d bytes, compressed %d bytes, ratio %3.2f\n", plain_bytes, comp_bytes, ratio); } - printf("\n"); exit: dprintf("> err=%d\n", err); if(err && !XfrState_get_err(state)){ @@ -697,7 +693,7 @@ int xfr_send(Args *args, XfrState *state, Conn *xend, uint32_t addr, uint32_t po Conn_close(peer); if(!err){ t1 = time(NULL) - t0; - dprintf("> Transfer complete in %lu seconds\n", t1); + iprintf("> Transfer complete in %lu seconds\n", t1); } dprintf("> done err=%d, notifying xend...\n", err); xfr_send_done(state, xend); @@ -709,15 +705,24 @@ int xfr_send(Args *args, XfrState *state, Conn *xend, uint32_t addr, uint32_t po */ int xfr_save(Args *args, XfrState *state, Conn *xend, char *file){ int err = 0; + int flags = (O_CREAT | O_EXCL | O_WRONLY); + int mode = 0644; + int fd; IOStream *io = NULL; dprintf("> file=%s\n", file); - io = file_stream_fopen(file, "wb"); - if(!io){ - dprintf("> Failed to open %s\n", file); + fd = open(file, flags, mode); + if(fd < 0) { + eprintf("> Failed to open %s\n", file); err = -EIO; goto exit; } + io = gzip_stream_fdopen(fd, "wb1"); + if(!io){ + eprintf("> Failed to allocate gzip state for %s\n", file); + err = -ENOMEM; + goto exit; + } err = xen_domain_snd(xend, io, state->vmid, state->vmconfig, state->vmconfig_n); if(err){ err = xfr_error(xend, err); @@ -729,6 +734,9 @@ int xfr_save(Args *args, XfrState *state, Conn *xend, char *file){ IOStream_close(io); IOStream_free(io); } + if(err){ + unlink(file); + } dprintf("< err=%d\n", err); return err; } @@ -758,7 +766,7 @@ int xfr_recv(Args *args, XfrState *state, Conn *peer){ exit: if(!err){ t1 = time(NULL) - t0; - dprintf("> Transfer complete in %lu seconds\n", t1); + iprintf("> Transfer complete in %lu seconds\n", t1); } if(err){ xfr_error(peer, err); @@ -783,14 +791,14 @@ int xfrd_service(Args *args, int peersock, struct sockaddr_in peer_in){ dprintf(">\n"); err = Conn_init(conn, flags, peersock, peer_in); if(err) goto exit; - dprintf(">xfr_hello... \n"); + //dprintf(">xfr_hello... \n"); err = xfr_hello(conn); if(err) goto exit; - dprintf("> sxpr...\n"); + //dprintf("> sxpr...\n"); err = Conn_sxpr(conn, &sxpr); if(err) goto exit; - dprintf("> sxpr=\n"); - objprint(iostdout, sxpr, PRINT_TYPE); IOStream_print(iostdout, "\n"); + //dprintf("> sxpr=\n"); + //objprint(iostdout, sxpr, PRINT_TYPE); IOStream_print(iostdout, "\n"); if(sxpr_elementp(sxpr, oxfr_migrate)){ // Migrate message from xend. uint32_t addr; @@ -839,7 +847,7 @@ int xfrd_service(Args *args, int peersock, struct sockaddr_in peer_in){ } else{ // Anything else is invalid. err = -EINVAL; - dprintf("> Invalid message: "); + eprintf("> Invalid message: "); objprint(iostderr, sxpr, 0); IOStream_print(iostderr, "\n"); xfr_error(conn, err); @@ -1091,6 +1099,7 @@ int main(int argc, char *argv[]){ int key = 0; int long_index = 0; + dprintf(">\n"); set_defaults(args); while(1){ key = getopt_long(argc, argv, short_opts, long_opts, &long_index); -- 2.30.2